Why Apache Spark for Big Data?

  1. Easy to use. Provides high-level API that focuses on the content of the computation.
  2. Fast, enabling interactive use and complex algorithms.
  3. General engine. Combines multiple types of computations (SQL queries, text processing, and ML)

Chapter 1: Introduction to Data Analysis with Spark

What is Apache Spark?

  1. Apache Spark is a cluster computing platform designed to be fast and general-purpose.
  2. Ability to run computation in memory.
  3. More efficient than MapReduce for complex applications.
  4. Integrate closely with other Big Data tools.

A Unified Stack

  1. Spark Core - Task scheduling, memory management, RDD API
  2. Spark SQL - Structured data
  3. Spark streaming - Live stream of data in real time
  4. MLlib machine learning
  5. GraphX graph processing
  6. Cluster Managers - Standalone, YARN, Mesos

Users of Spark

  1. Data Scientist
  2. Engineer

Chapter 2: Downloading Spark and Getting Started

Spark shell allow us to interact with data that is distributed on disk or in memory across many machines. Provides Scala and Python shells.

  1. Scala shell: bin/spark-shell
  2. Python shell (PySpark): bin/pyspark

Changing verbosity of logging in spark shell

Make a copy of conf/log4j.properties.template called conf/log4j.properties and find the following line:
log4j.rootCategory=INFO, console
And change it to
log4j.rootCategory=WARN, console

Working with RDD


In [21]:
lines = sc.textFile('file:///usr/local/spark/README.md')

In [22]:
lines


Out[22]:
file:///usr/local/spark/README.md MapPartitionsRDD[24] at textFile at NativeMethodAccessorImpl.java:0

In [23]:
lines.count()


Out[23]:
104

In [24]:
lines.first()


Out[24]:
u'# Apache Spark'

Introduction to Core Spark Concepts

Every Spark application consists of a driver program that launches various parallel operations on a cluster.
Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster.
Driver programs manages a number of nodes called executors.

Standalone Applications

In standalone applications, such as scripts, we have to initialize our own SparkContext.
In Java and Scala, one has to give the application a Maven dependency on the spark-core artifact.
In Python, application must be run using bin/spark-submit script.

Initializing a SparkContext


In [25]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('My App')
#sc = SparkContext(conf=conf) # Spark context already running inside Ipython notebook

Chapter 3: Programming with RDDs

Resilient Distributed Dataset (RDD) is distributed collection of elements.
In Spark, all works is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

RDD Basics

Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.
RDDs can contain any type of Python, Java, or Scala objects, including user defined classes.

Creating RDD:

  1. Loading an external dataset
  2. Distributing a collection of objects in driver program.

Once created, RDDs offer two types of operations.

  1. Transformations - construct a new RDD from a previous one.
  2. Actions - compute a result based on an RDD, and either return it to the driver program or save it to an external sotrage system. (e.g HDFS)

Spark performs transformations in lazy fashion, i.e transformations are only computed when an action is called.

Spark RDDs are by default recomputed each time one run an action on them. In order to overcome this, use


In [26]:
# rdd.persist()

Every Spark application will work as follows:

  1. Create RDD
  2. Transform RDD
  3. Persist RDD
  4. Perfom Action on RDD

Creating RDDs

  1. Take an existing collection in your program and pass it to SparkContext's parallelize()

In [27]:
lines = sc.parallelize(['pandas', 'I like pandas'])
lines


Out[27]:
ParallelCollectionRDD[27] at parallelize at PythonRDD.scala:475
  1. Load data from external storage

In [28]:
lines = sc.textFile('file:///usr/local/spark/README.md')
lines


Out[28]:
file:///usr/local/spark/README.md MapPartitionsRDD[29] at textFile at NativeMethodAccessorImpl.java:0

RDD Operations

  1. Transformations - returns RDD
  2. Actions - returns data type

Transformations

Transformed RDDs are computed lazily, only when one use them in an action.


In [29]:
inputRDD = sc.textFile('log.txt')
errorsRDD = inputRDD.filter(lambda x: 'error' in x)
warningsRDD = inputRDD.filter(lambda x: 'warning' in x)
# badLinesRDD = errorsRDD.union(warningsRDD)

Spark keeps track of the set of dependencies between different RDDs, called the lineage graph.
It uses this information to compute each RDD on demand and to recover lost data if part of persistent RDD is lost.

Actions

Operations that return a final value to the driver program or write data to an external storage system.


In [30]:
# print 'Input had ' + badLinesRDD.count() + ' concerning lines'
# print 'Here are the 10 examples'
# for line in badLinesRDD.take(10):
#     print line

RDD also have collect() function to retrieve the entire RDD.
In order to collect large RDD, better save the content of an RDD using saveAsTextFile() function.

Lazy Evaluation

When we call a transformation on an RDD, the operation is not immediately performed.
Think of each RDD as consisting of instructions on how to compute the data that we build up through transformations.

Passing functions to Spark

Most of Spark’s transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data.

Three options for passing functions

  1. lambda
  2. Top-level functions
  3. Locally defined functions

Common Transformations and Actions

Element-wise transformations

  1. map() - takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD.
  2. filter() - takes in a function and returns an RDD that only has elements that pass the filter() function.
  3. flatMap() - we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD that consists of the elements from all of the iterators.

In [31]:
lines = sc.parallelize(['hello world', 'hi'])

In [32]:
words = lines.map(lambda line: line.split())
words.collect()


Out[32]:
[['hello', 'world'], ['hi']]

In [33]:
words = lines.flatMap(lambda line: line.split())
words.collect()


Out[33]:
['hello', 'world', 'hi']

Pseudo set operations

  1. rdd.distinct()
  2. rdd1.union(rdd2)
  3. rdd1.intersection(rdd2)
  4. rdd1.subtract(rdd2)
  5. rdd1.cartesian(rdd2)

Actions

  1. reduce() - takes a function that operates on two elements of the type in your RDD and returns a new element of the same type.
  2. fold() - takes a function with the same signature as needed for reduce(), but in addition takes a “zero value” to be used for the initial call on each partition. The zero value you provide should be the identity element for your operation.
  3. aggregate() - we supply an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.
  4. collect() - returns the entire RDD's content to the driver.
  5. take(n) - returns n elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection.
  6. top() - extract the top elements from an RDD.
  7. takeSample(withReplacement, num, seed) - take a sample of our data either with or without replacement.
  8. foreach() - lets us perform computations on each element in the RDD without bringing it back locally.
  9. count()
  10. countByValue() - returns a map of each unique value to its count.

Note: Return type of the result in reduce() and fold() should be the same type as that of the elements in the RDD we are operating over.

Converting between RDD types

Some functions are available only on certain types of RDDs, such as mean() and variance() on numeric RDDs or join() on key/value pair RDDs.
In Scala and Java, these methods aren’t defined on the standard RDD class, so to access this additional functionality we have to make sure we get the correct specialized class.

Persistence (Caching)

To avoid computing an RDD multiple times, we can ask Spark to persist the data.


In [34]:
# rdd.persist(StorageLevel.MEMORY_ONLY)

Chapter 4: Working with Key/Value Pairs

Spark provides special operations on RDDs containing key/value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.

Creating Pair RDDs

  1. Few reading formats directly return pair RDDs for their key/value data.
  2. Use map() to convert RDD into Pair RDD.

In [35]:
pairs = words.map(lambda w: (w, 1))
pairs.collect()


Out[35]:
[('hello', 1), ('world', 1), ('hi', 1)]

Transformations on Pair RDDs

  1. reduceByKey()
  2. groupByKey()
  3. combineByKey()
  4. mapValues()
  5. flatMapValues()
  6. keys()
  7. values()
  8. sortByKey()

Aggregations

  1. reduceByKey() - runs several parallel reduce operations, one for each key in the dataset, where each operation combines values that have the same key.
  2. foldByKey()
  3. combineByKey() - is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it.

In [36]:
## Word Count
rdd = sc.textFile('file:///usr/local/spark/README.md')
words = rdd.flatMap(lambda x: x.split())
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
result.take(10)


Out[36]:
[(u'storage', 1),
 (u'"local"', 1),
 (u'including', 4),
 (u'computation', 1),
 (u'file', 1),
 (u'Maven', 1),
 (u'using:', 1),
 (u'guidance', 2),
 (u'Scala,', 1),
 (u'environment', 1)]

Tuning the level of parallelism

Spark will always try to infer a sensible default value based on the size of your cluster, but in some cases you will want to tune the level of parallelism for better performance.
Pass number of paritions or use repartition() or coalesce() for tuning.

Grouping

  1. groupByKey() - If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]].
  2. groupBy() - works on unpaired data or data where we want to use a different condition besides equality on the current key. It takes a function that it applies to every element in the source RDD and uses the result to determine the key.
  3. cogroup() - group data sharing the same key from multiple RDDs.

Joins

  1. rdd1.join(rdd2)
  2. rdd1.leftOuterJoin(rdd2)
  3. rdd1.rightOuterJoin(rdd2)

Sorting data

  1. sortByKey()

Actions Available on the Pair RDDs

  1. countByKey()
  2. collectAsMap()
  3. lookup(key) - Return all values associated with the provided key.

Data Partitioning (Advanced)

Spark programs can choose to control their RDDs’ partitioning to reduce communication.
Use partitionBy() transformation at the start of the program.

Determining an RDD's partitioner

Use rdd.partitioner in Scala and Java to determine the partitioner

Operations that benefit from Partitioning

cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), and lookup()

Example: PageRank

Custom Partitioners

While Spark’s HashPartitioner and RangePartitioner are well suited to many use cases, Spark also allows you to tune how an RDD is partitioned by providing a custom Partitioner object.

Chapter 5: Loading and Saving your data

Three common sets of data sources:

  1. File formats and filesystems - local or distributed filesystem.
  2. Structured data sources through Spark SQL
  3. Databases and key/value stores

File formats

Text Files

Loading:
sc.texfile() - load a single text file as an RDD, each input line becomes an element in the RDD.
sc.wholeTextFiles() - load multiple whole text files at the same time into a pair RDD, with the key being the name and the value being the contents of each file.

Saving:
result.saveAsTextFile() - The path is treated as a directory and Spark will output multiple files underneath that directory.

JSON

Loading the data as a text file and then parsing the JSON data is an approach that we can use in all of the supported languages. This works assuming that you have one JSON record per row.


In [37]:
import json
data = rdd.map(lambda x: json.loads(x))

# data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)

CSV and TSV

Loading CSV/TSV data is similar to loading JSON data in that we can first load it as text and then process it.

Sequence files

SequenceFiles are a popular Hadoop format composed of flat files with key/value pairs. SequenceFiles have sync markers that allow Spark to seek to a point in the file and then resynchronize with the record boundaries. This allows Spark to efficiently read SequenceFiles in parallel from multiple nodes.

Use sc.sequenceFile() function to read sequence files
Use pairRDD.saveAsSequenceFile() to save sequence file

Object Files

Use sc.objectFile() to read an object file
Use rdd.saveAsObjectFile() to save an object file

In Python, use saveAsPickleFile() and pickleFile() instead.

Hadoop Input/Output formats

Use sc.hadoopFile() to load old Hadoop file
Use sc.newAPIHadoopFile() to load new Hadoop file
Use rdd.saveAsHadoopFile() to save an RDD as an old Hadoop file
Use rdd.saveAsNewAPIHadoopFile() to save an RDD as a new Hadoop file

Non-filesystem data sources

  1. Protocol buffers

File Compression

Working with Big Data, we find ourselves needing to use compressed data to save storage space and network overhead.

Filesystems

Spark supports a large number of filesystems for reading and writing to, which we can use with any of the file formats we want.

Local/Regular FS

While Spark supports loading files from the local filesystem, it requires that the files are available at the same path on all nodes in your cluster.

Amazon S3

HDFS

Structured data with Spark SQL

Spark SQL is a component to work with structured and semistructured data. By structured data, we mean data that has a schema that is, a consistent set of fields across data records.

Apache Hive

Hive can store tables in a variety of formats, from plain text to column-oriented formats, inside HDFS or other storage systems. Spark SQL can load any table supported by Hive.

JSON

To load JSON data, first create a HiveContext as when using Hive. Then use the HiveContext.jsonFile method to get an RDD of Row objects for the whole file. Apart from using the whole Row object, you can also register this RDD as a table and select specific fields from it.

Databases

Java Database Connectivity

Spark can load data from any relational database that supports Java Database Con‐ nectivity (JDBC), including MySQL, Postgres, and other systems.

Cassandra

The Spark Cassandra connector is currently only available in Java and Scala.

HBase

Elasticsearch

Chapter 6: Advanced Spark Programming

Accumulators

When we normally pass functions to Spark, such as a map() function or a condition for filter(), they can use variables defined outside them in the driver program, but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver.

Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program.


In [38]:
rdd = sc.textFile('file:///usr/local/spark/README.md')
blank_lines = sc.accumulator(0)

def calculateLinesLength(x):
    global blank_lines
    length = len(x)
    
    if not length:
        blank_lines += 1
        
    return length

lines_length = rdd.map(calculateLinesLength)
lines_length.collect()[:10]


Out[38]:
[14, 0, 78, 75, 73, 74, 56, 42, 0, 26]

In [39]:
blank_lines.value


Out[39]:
39

Accumulators work as follows:

  1. We create them in the driver by calling the SparkContext.accumulator(initial Value) method, which produces an accumulator holding an initial value. The return type is an org.apache.spark.Accumulator[T] object, where T is the type of initialValue.
  2. Worker code in Spark closures can add to the accumulator with its += method (or add in Java).
  3. The driver program can call the value property on the accumulator to access its value (or call value() and setValue() in Java).

Accumulators and Fault Tolerance

Note that tasks on worker nodes cannot access the accumulator’s value()—from the point of view of these tasks, accumulators are write-only variables. This allows accumulators to be implemented efficiently, without having to communicate every update.
The end result is that for accumulators used in actions, Spark applies each task’s update to each accumulator only once. Thus, if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach().
For accumulators used in RDD transformations instead of actions, this guarantee does not exist.

Custom Accumulators

Custom accumulators need to extend AccumulatorParam. Beyond adding to a numeric value, we can use any operation for add, provided that operation is commutative and associative.


In [40]:
from pyspark.accumulators import AccumulatorParam

class CustomAccumulatorParam(AccumulatorParam):
    def zero(self, initial):
        return initial
    
    def addInPlace(self, data1, data2):
        data1 += data2
        return data1
    
accum = sc.accumulator([], CustomAccumulatorParam())
accum


Out[40]:
Accumulator<id=2, value=[]>

Broadcast Variables

Broadcast variables allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations.


In [41]:
# signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

# countryContactCounts = contactCounts.map(processSignCount)

The process of using broadcast variables:

  1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable.
  2. Access its value with the value property (or value() method in Java).
  3. The variable will be sent to each node only once, and should be treated as read- only (updates will not be propagated to other nodes).

Optimizing Broadcasts

When we are broadcasting large values, it is important to choose a data serialization format that is both fast and compact, because the time to send the value over the network can quickly become a bottleneck if it takes a long time to either serialize a value or to send the serialized value over the network.

Working on a Per-Partition Basis

Working with data on a per-partition basis allows us to avoid redoing setup work for each data item. Operations like opening a database connection or creating a random- number generator are examples of setup steps that we wish to avoid doing for each element. Spark has per-partition versions of map and foreach to help reduce the cost of these operations by letting you run code only once for each partition of an RDD.

Use mapPartitions() function, which gives us an iterator of the elements in each partition of the input RDD and expects us to return an iterator of our results.

Piping to External Programs

Spark provides a general mechanism to pipe data to programs in other languages, like R scripts. Spark provides a pipe() method on RDDs. Spark’s pipe() lets us write parts of jobs using any language we want as long as it can read and write to Unix standard streams.

Numeric RDD Operations

  1. count()
  2. mean()
  3. sum()
  4. max()
  5. min()
  6. variance()
  7. sampleVariance()
  8. stdev()
  9. sampleStdev()

Chapter 7: Running on a cluster

Spark can run on a wide variety of cluster managers:

  1. Hadoop YARN
  2. Apache Mesos
  3. Standalone cluster manager

Spark Runtime Architecture

In distributed mode, Spark uses a master/slave architecture with one central coordinator and many distributed workers.
The central coordinator is called the driver.
The driver communicates with a potentially large number of distributed workers called executors.
The driver runs in its own Java process and each executor is a separate Java process.
A driver and its executors are together termed a Spark application.
A Spark application is launched on a set of machines using an external service called a cluster manager.

The Driver

The driver is the process where the main() method of your program runs. It is the process running the user code that creates a SparkContext, creates RDDs, and performs transformations and actions.

When the driver runs, it performs two duties:

  1. Converting a user program into tasks
  2. Scheduling tasks on executors

The driver exposes information about the running Spark application through a web interface, which by default is available at port 4040.

Executors

Spark executors are worker processes responsible for running the individual tasks in a given Spark job.

Cluster Manager

The cluster manager is a pluggable component in Spark. This allows Spark to run on top of different external managers, such as YARN and Mesos, as well as its built-in Stand‐ alone cluster manager.

Launching a Program

Spark provides a single script you can use to submit your program to it called spark-submit.

Summary

  1. The user submits an application using spark-submit.
  2. spark-submit launches the driver program and invokes the main() method specified by the user.
  3. The driver program contacts the cluster manager to ask for resources to launch executors.
  4. The cluster manager launches executors on behalf of the driver program.
  5. The driver process runs through the user application. Based on the RDD actions and transformations in the program, the driver sends work to executors in the form of tasks.
  6. Tasks are run on executor processes to compute and save results.
  7. If the driver’s main() method exits or it calls SparkContext.stop(), it will terminate the executors and release resources from the cluster manager.

Deploying Applications with spark-submit

bin/spark-submit [options] {app jar | python file} [app options]
[options] are a list of flags for spark-submit. You can enumerate all possible flags by running spark-submit --help.

Packaging your code with dependencies

Since PySpark uses the existing Python installation on worker machines, you can install dependency libraries directly on the cluster machines using standard Python package managers (such as pip or easy_install), or via a manual installation into the site-packages/ directory of your Python installation. Alternatively, you can submit individual libraries using the --py-files argument to spark-submit and they will be added to the Python interpreter’s path.

For Java and Scala, it’s common practice to rely on a build tool to produce a single large JAR containing the entire transitive dependency graph of an application. The most popular build tools for Java and Scala are Maven and sbt (Scala build tool).

Cluster Managers

  1. Standalone Cluster Manager
  2. Hadoop YARN
  3. Apache Mesos

Standalone Cluster Manager

Launching the Standalone cluster manager

To use the cluster launch scripts, follow these steps:

  1. Copy a compiled version of Spark to the same location on all your machines—for example, /home/yourname/spark.
  2. Set up password-less SSH access from your master machine to the others. This requires having the same user account on all the machines, creating a private SSH key for it on the master via ssh-keygen, and adding this key to the .ssh/ authorized_keys file of all the workers.
  3. Edit the conf/slaves file on your master and fill in the workers’ hostnames.
  4. To start the cluster, run sbin/start-all.sh on your master (it is important to run it there rather than on a worker). If everything started, you should get no prompts for a password, and the cluster manager’s web UI should appear at http://masternode:8080 and show all your workers.
  5. To stop the cluster, run bin/stop-all.sh on your master node.

Submitting applications

To submit an application to the Standalone cluster manager, pass spark://master node:7077 as the master argument to spark-submit.

spark-submit --master spark://masternode:7077 yourapp

Two deploy modes:

  1. Client mode - In client mode (the default), the driver runs on the machine where you executed spark-submit, as part of the spark-submit com‐ mand. This means that you can directly see the output of your driver program, or send input to it (e.g., for an interactive shell), but it requires the machine from which your application was submitted to have fast connectivity to the workers and to stay available for the duration of your application.
  2. Cluster mode - the driver is launched within the Standalone cluster, as another process on one of the worker nodes, and then it connects back to request executors. In this mode spark-submit is “fire-and-forget” in that you can close your laptop while the application is running. You will still be able to access logs for the application through the cluster manager’s web UI.

Configuring resource usage

Resource allocation is controlled by two settings:

  1. Executor memory - Each application will have at most one executor on each worker, so this setting controls how much of that worker’s memory the application will claim.
  2. The maximum number of total cores - This is the total number of cores used across all executors for an application.

High availability

When running in production settings, you will want your Standalone cluster to be available to accept applications even if individual nodes in your cluster go down. Out of the box, the Standalone mode will gracefully support the failure of worker nodes. If you also want the master of the cluster to be highly available, Spark supports using Apache ZooKeeper (a distributed coordination system) to keep multiple standby masters and switch to a new one when any of them fails.

Hadoop YARN

Running Spark on YARN in these environments is useful because it lets Spark access HDFS data quickly, on the same nodes where the data is stored.
Using YARN in Spark is straightforward: you set an environment variable that points to your Hadoop configuration directory, then submit jobs to a special master URL with spark-submit.

spark-submit --master yarn yourapp

Apache Mesos

Apache Mesos is a general-purpose cluster manager that can run both analytics workloads and long-running services (e.g., web applications or key/value stores) on a cluster.

To use Spark on Mesos, pass a mesos:// URI to spark-submit:
spark-submit --master mesos://masternode:5050 yourapp

Amazon EC2

Spark comes with a built-in script to launch clusters on Amazon EC2. This script launches a set of nodes and then installs the Standalone cluster manager on them.
The Spark EC2 script is called spark-ec2, and is located in the ec2 folder of your Spark installation.

Chapter 8: Tuning and Debugging Spark

Spark is designed so that default settings work “out of the box” in many cases; however, there are still some configurations users might want to modify.

Configuring Spark with SparkConf

  1. SparkConf class - A SparkConf instance is required when you are creating a new SparkContext. A SparkConf instance contains key/value pairs of configuration options the user would like to override. Every configuration option in Spark is based on a string key and value. To use a SparkConf object you create one, call set() to add configuration values, and then supply it to the SparkContext constructor.
  2. spark-submit tool - When an application is launched with spark-submit, it injects configuration values into the environment. These are detected and automatically filled in when a new SparkConf is constructed.
  3. spark-defaults.conf - Spark-submit will look for a file called conf/ spark-defaults.conf in the Spark directory and attempt to read whitespace-delimited key/value pairs from this file. You can also customize the exact location of the file using the --properties-file flag to spark-submit.

The highest priority is given to configurations declared explicitly in the user’s code using the set() function on a SparkConf object. Next are flags passed to spark- submit, then values in the properties file, and finally default values.

Components of Execution: Jobs, Tasks, and Stages

To display the lineage of an RDD, Spark provides a toDebugString() method.

The following phases occur during Spark execution:

  1. User code defines a DAG (directed acyclic graph) of RDDs - Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph.
  2. Actions force translation of the DAG to an execution plan - When you call an action on an RDD it must be computed. This requires computing its parent RDDs as well. Spark’s scheduler submits a job to compute all needed RDDs. That job will have one or more stages, which are parallel waves of computation composed of tasks. Each stage will correspond to one or more RDDs in the DAG. A single stage can correspond to multiple RDDs due to pipelining.
  3. Tasks are scheduled and executed on a cluster - Stages are processed in order, with individual tasks launching to compute seg‐ ments of the RDD. Once the final stage is finished in a job, the action is complete.

Finding Information

Spark records detailed progress information and performance metrics as applications execute. These are presented to the user in two places: the Spark web UI and the logfiles produced by the driver and executor processes.

Spark Web UI

Spark’s built-in web UI - This is available on the machine where the driver is running at port 4040 by default. In the case of the YARN cluster mode, where the application driver runs inside the cluster, you should access the UI through the YARN ResourceManager, which proxies requests directly to the driver.

It helps in debugging following things:

  1. Jobs: Progress and metrics of stages, tasks, and more
  2. Storage: Information for RDDs that are persisted
  3. Executors: A list of executors present in the application
  4. Environment: Debugging Spark’s configuration

Driver and Executor Logs

Spark’s logging subsystem is based on log4j, a widely used Java logging library, and uses log4j’s configuration format. An example log4j configuration file is bundled with Spark at conf/log4j.properties.template. To customize Spark’s logging, first copy the example to a file called log4j.properties.

Key Performance Considerations

Level of Parallelism

Spark offers two ways to tune the degree of parallelism for operations:

  1. The first is that, during operations that shuffle data, you can always give a degree of parallelism for the produced RDD as a parameter.
  2. The second is that any existing RDD can be redistributed to have more or fewer partitions. The repartition() operator will randomly shuffle an RDD into the desired number of partitions. If you know you are shrinking the RDD, you can use the coalesce() operator; this is more efficient than repartition() since it avoids a shuffle operation.

Serialization Format

When Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation, but cannot serialize all types of objects “out of the box.” Almost all applications will benefit from shifting to Kryo for serialization.

Memory Management

Inside of each executor, memory is used for a few purposes:

  1. RDD storage
  2. Shuffle and aggregation buffers
  3. User code

By default Spark will leave 60% of space for RDD storage, 20% for shuffle memory, and the remaining 20% for user programs. In some cases users can tune these options for better performance.

Hardware Provisioning

The main parameters that affect cluster sizing are the amount of memory given to each executor, the number of cores for each executor, the total number of executors, and the number of local disks to use for scratch data.

Spark applications will benefit from having more memory and cores. Spark’s architecture allows for linear scaling; adding twice the resources will often make your application run twice as fast.

Chapter 9: Spark SQL

Spark’s interface for working with structured and semistructured data. Structured data is any data that has a schem that is, a known set of fields for each record.

Spark SQL provides three main capabilities:

  1. It can load data from a variety of structured sources (e.g., JSON, Hive, and Parquet).
  2. It lets you query the data using SQL, both inside a Spark program and from external tools that connect to Spark SQL through standard database connectors (JDBC/ODBC), such as business intelligence tools like Tableau.
  3. When used within a Spark program, Spark SQL provides rich integration between SQL and regular Python/Java/Scala code, including the ability to join RDDs and SQL tables, expose custom functions in SQL, and more. Many jobs are easier to write using this combination.

Using Spark SQL in Applications

The most powerful way to use Spark SQL is inside a Spark application. This gives us the power to easily load data and query it with SQL while simultaneously combining it with “regular” program code in Python, Java, or Scala.

Loading and Saving Data

  1. Apache Hive
  2. Parquet
  3. JSON
  4. From RDDs

User-Defined functions

User-defined functions, or UDFs, allow you to register custom functions in Python, Java, and Scala to call within SQL.

Spark SQL offers a built-in method to easily register UDFs by passing in a function in your programming language. In Scala and Python, we can use the native function and lambda syntax of the language, and in Java we need only extend the appropriate UDF class. In Python and Java we also need to specify the return type.

Chapter 10: Spark Streaming

Spark Streaming provides an abstraction called DStreams, or discretized streams. A DStream is a sequence of data arriving over time. Internally, each DStream is represented as a sequence of RDDs arriving at each time step (hence the name “discretized”). DStreams can be created from various input sources, such as Flume, Kafka, or HDFS. Once built, they offer two types of operations: transformations, which yield a new DStream, and output operations, which write data to an external system. DStreams provide many of the same operations available on RDDs, plus new operations related to time, such as sliding windows.

A Simple Example

  1. We will start by creating a StreamingContext, which is the main entry point for streaming functionality. This also sets up an underlying SparkContext that it will use to process the data. It takes as input a batch interval specifying how often to process new data, which we set to 1 second.
  2. Next, we use socketTextStream() to create a DStream based on text data received on port 7777 of the local machine.
  3. Do something with the input
  4. To start receiving data, we must explicitly call start() on the StreamingContext.
  5. To keep our application from exiting, we also need to call awaitTermination to wait for the streaming computation to finish.

Architecture and Abstraction

  1. Spark Streaming uses a “micro-batch” architecture, where the streaming computation is treated as a continuous series of batch computations on small batches of data. Spark Streaming receives data from various input sources and groups it into small batches. New batches are created at regular time intervals.
  2. Output operations are similar to RDD actions in that they write data to an external system, but in Spark Streaming they run periodically on each time step, producing output in batches.
  3. For each input source, Spark Streaming launches receivers, which are tasks running within the application’s executors that collect data from the input source and save it as RDDs.
  4. Spark Streaming also includes a mechanism called checkpointing that saves state periodically to a reliable filesystem (e.g., HDFS or S3). Typically, you might set up checkpointing every 5–10 batches of data.

Transformations

Transformations on DStreams can be grouped into either stateless or stateful:

  1. Stateless transformations - the processing of each batch does not depend on the data of its previous batches.
  2. Stateful transformations - use data or intermediate results from previous batches to compute the results of the current batch.

Stateless transformations

Many of the RDD transformations are also available on DStreams.
Stateless transformations can also combine data from multiple DStreams, again within each time step.
The transform() operation lets you provide any arbitrary RDD-to- RDD function to act on the DStream

Stateful transformations

Stateful transformations are operations on DStreams that track data across time; that is, some data from previous batches is used to generate the results for a new batch.

The two main types:

  1. windowed operations, which act over a sliding window of time periods.
  2. updateStateByKey(), which is used to track state across events for each key (e.g., to build up an object representing each user session).

Stateful transformations require checkpointing to be enabled in your StreamingCon‐ text for fault tolerance.

Windowed transformations

Windowed operations compute results across a longer time period than the StreamingContext’s batch interval, by combining results from multiple batches.

All windowed operations need two parameters:

  1. Window duration - controls how many previous batches of data are considered.
  2. Sliding duration - controls how frequently the new DStream computes results.

Both must be a multiple of the StreamingContext’s batch interval

The simplest window operation we can do on a DStream is window()

Spark Streaming provides a number of other windowed operations for efficiency and convenience"

  1. reduceByWindow()
  2. reduceByKeyAndWindow()
  3. countByWindow()
  4. countByValueAndWindow()

UpdateStateByKey transformation

To maintain state across the batches in a DStream, updateStateByKey() provides access to a state variable for DStreams of key/value pairs. Given a DStream of (key, event) pairs, it lets you construct a new DStream of (key, state) pairs by taking a function that specifies how to update the state for each key given new events.

To use updateStateByKey(), we provide a function update(events, oldState) that takes in the events that have arrived for a key and its previous state, and returns a newState to store for it.

Output Operations

Output operations specify what needs to be done with the final transformed data in a stream (e.g., pushing it to an external database or printing it to the screen).

Much like lazy evaluation in RDDs, if no output operation is applied on a DStream and any of its descendants, then those DStreams will not be evaluated. And if there are no output operations set in a StreamingContext, then the context will not start.

Input Sources

Spark Streaming has built-in support for a number of different data sources.

  1. Core Sources
  2. Additional Sources

Core Sources

  1. Sockets
  2. Files
  3. Akka actors

Additional Sources

  1. Apache Kafka
  2. Apache Flume
  3. Apache Kinesis
  4. Twitter
  5. ZeroMQ

Multiple Sources and Cluster Sizing

Each receiver runs as a long-running task within Spark’s executors, and hence occupies CPU cores allocated to the application. In addition, there need to be available cores for processing the data. This means that in order to run multiple receivers, you should have at least as many cores as the number of receivers, plus however many are needed to run your computation.

24/7 Operation

To run Spark Streaming applications 24/7, you need some special setup:

  1. Checkpointing
  2. Driver/Worker/Receiver Fault Tolerance

Checkpointing

It allows Spark Streaming to periodically save data about the application to a reliable storage system, such as HDFS or Amazon S3.

Checkpointing serves two purposes:

  1. Limiting the state that must be recomputed on failure.
  2. Providing fault tolerance for the driver.

Driver Fault Tolerance

Instead of simply calling new StreamingContext, we need to use the StreamingContext.getOrCreate() function.

Performance Considerations

Spark Streaming applications have a few specialized tuning options:

  1. Batch and Window Sizes
  2. Level of Parallelism
  3. Garbage Collection and Memory Usage

Batch and Window Sizes

The best approach is to start with a larger batch size (around 10 seconds) and work your way down to a smaller batch size. If the processing times reported in the Streaming UI remain consistent, then you can continue to decrease the batch size, but if they are increasing you may have reached the limit for your application. In a similar way, for windowed operations, the interval at which you compute a result (i.e., the slide interval) has a big impact on performance. Consider increasing this interval for expensive computations if it is a bottleneck.

Level of Parallelism

  1. Increasing the number of receivers
  2. Explicitly repartitioning received data
  3. Increasing parallelism in aggregation

Garbage Collection and Memory Usage

We can control the GC by adding -XX:+UseConcMarkSweepGC to the spark.execu tor.extraJavaOptions

Chapter 11: Machine Learning with MLlib

MLlib is best suited for running each algorithm on a large dataset. If you instead have many small datasets on which you want to train different learning models, it would be better to use a single node learning library (e.g., Weka or SciKit-Learn) on each node, perhaps calling it in parallel across nodes using a Spark map().

Data types

  1. Vector
  2. LabeledPoint
  3. Rating
  4. Various Model classes

Working with Vectors

  1. Dense vector
  2. Sparse vector

Sparse vectors are usually preferable (both in terms of memory use and speed) if at most 10% of elements are nonzero.

Algorithms

Feature Extraction

  1. TF-IDF - MLlib has two algorithms that compute TF-IDF: HashingTF and IDF
  2. Scaling - Use the StandardScaler class in MLlib to do this scaling, all features have a mean of 0 and standard deviation of 1
  3. Normalisation - normalizing vectors to length 1 is also useful to prepare input data. The Normalizer class allows this. Simply use Normalizer().transform(rdd).
  4. Word2Vec - Word2Vec3 is a featurization algorithm for text based on neural networks that can be used to feed data into many downstream algorithms.

Statistics

  1. Statistics.colStats(rdd) - Computes a statistical summary of an RDD of vectors, which stores the min, max, mean, and variance for each column in the set of vectors.
  2. Statistics.corr(rdd, method) - Computes the correlation matrix between columns in an RDD of vectors, using either the Pearson or Spearman correlation
  3. Statistics.corr(rdd1, rdd2, method) - Computes the correlation between two RDDs of floating-point values, using either the Pearson or Spearman correlation
  4. Statistics.chiSqTest(rdd) - Computes Pearson’s independence test for every feature with the label on an RDD of LabeledPoint objects.

Classification and Regression

  1. Linear Regression - The linear regression algorithms are available through the mllib.regression.Line arRegressionWithSGD, LassoWithSGD, and RidgeRegressionWithSGD classes.
  2. Logistic Regression - Logistic regression is a binary classification method that identifies a linear separating plane between positive and negative examples.
  3. Support Vector Machines
  4. Naive Bayes
  5. Decision Trees and Random Forests

Clustering

  1. K-means

Collaborative Filtering and Recommendation

  1. Alternating Least Squares

Dimensionality Reduction

  1. Principal component analysis
  2. Singular value decomposition

Model Evaluation

  1. BinaryClassificationMetrics
  2. MulticlassMetrics